package com.atguigu.cdc;

//flink的cdc学习，cdc也是作为一个实时监控表的状态.和maxwell，cannel功能一样.
//https://github.com/ververica/flink-cdc-connectors cdc文档

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class FlinkCDC_DStream {
    //TODO 学习，使用FlinkCDC去读取Mysql中的数据.当前为 Usage for DataStream API
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


       /* 但是为了保证可以每次都从上次读取的状态去读取，不必每次都从设置的初始位置去读取历史数据
        所以我们设置了checkpoint去保存这个binlog*/
       //TODO 1 开启检查点,每隔5s保存一次ck，并且设置精准一次性.
        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        //TODO 1.1设置超时时间,1min
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        //TODO 1.2 指定重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2,2000L));
        //TODO 1.3 设置任务关闭的时候保留最后一次CK数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //TODO 1.4 设置状态后端
        env.setStateBackend(new FsStateBackend("hdfs://hadoop104:8020/flinkCDC"));
        //TODO 1.5 设置操作用户
        System.setProperty("HADOOP_USER_NAME","atguigu");


        //mysql数据作为输入源.
        SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("hadoop104")
                .port(3306)
                //可以监控多个数据库.
                .databaseList("gmall1021_realtime") // monitor all tables under inventory database
                //因为可以输入的多个数据库中可能有表明重复，所以表前需要+数据库名.
                .tableList("gmall1021_realtime.t_user")
                .username("root")
                .password("000000")
                //initial 当启动时都数据库，可以读历史数据
                //earliest 从binlog开始读
                //latest //从binlog末尾读.
                .startupOptions(StartupOptions.initial())
                .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
                .build();


        env
                .addSource(sourceFunction)
                .print(); // use parallelism 1 for sink to keep message ordering

        env.execute();
    }
}
